package com.atguigu.flink.sql.other;

import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

import java.util.Arrays;


/**

 *  JDBCCatalog： 读取JDBC设备中已经建好的库和表。
 *      目前只支持 mysql 和 postgresql
 *
 */
public class Demo2_JDBCCatalog
{
    public static void main(String[] args) {

        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
        TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);

        //1.创建MysqlCatalog
        MySqlCatalog mySqlCatalog = new MySqlCatalog(
            Demo2_JDBCCatalog.class.getClassLoader(),
            "mysql",
            "Mybatis",
            "root",
            "000000",
            "jdbc:mysql://hadoop102:3306"
        );

        //2.注册
        tableEnvironment.registerCatalog("a",mySqlCatalog);

        //3.切换
        tableEnvironment.useCatalog("a");

        //列出当前目录下所有的表
        System.out.println(Arrays.toString(tableEnvironment.listTables()));

        //可以直接读Mysql中的表
        tableEnvironment.sqlQuery("select * from emp ").execute().print();

        //建表，把当前要建的表的元数据存入Mysql，是不可能实现的
        String createTableSql = " create table t1 ( id STRING, ts BIGINT , vc INT " +
            "    )WITH (" +
            "  'connector' = 'filesystem'," +
            "  'path' = 'data/sensor.json'," +
            "  'format' = 'json'" +
            ") ";

        tableEnvironment.executeSql(createTableSql);

    }
}
